Como vimos el semestre pasado en “Introducción a ciencia de datos”:
La mayor parte del tiempo dedicado a hacer un análisis de datos exitoso -70% a 80%- se nos va en hacer preprocesamiento a los datos que queremos analizar. Los datos nunca están limpios, siempre son messy por lo que siempre tenemos que limpiarlos, acomodarlos, completarlos, etc. Para grandes sets de datos esto puede complicarse más al requerir métodos computacionales para siquiera ver qué preprocesamiento se les tiene que hacer a los datos, o bien también requerimos pasar mucho tiempo en el feature engineering para encontrar las mejores variables que nos brindarán información al modelo -y bajar la maldición de la dimensionalidad-
La iteración es un proceso fundamental en ciencia de datos, el modelado y el análisis requiere de pasar muchas veces por el set de datos. Por ejemplo, los algoritmos de stochastic gradient descent y expectation maximization requieren de pasar n mil veces por los datos para llegar a converger. También cuando hacemos análisis exploratorio normalmente un query -el resultado del mismo- nos llevará a preguntarnos la siguiente pregunta -otro query-. Cuando construimos modelos pasamos muchas veces por diferentes modelos o incluso en uno mismo cambiando sus hiperparámetros para encontar el mejor modelo con los mejores parámetros -magic loop, multi-armed bandit algorithm-
La tarea de un data scientist no termina cuando tenemos el mejor modelo, termina cuando lo ponemos como una aplicación de datos que está en el mundo real alterando la realidad para la que fue creado. Hacer que esta aplicación de datos se convierta en parte de un servicio productivo requiere -normalmente- de que sea re-construido periódicamente o hasta en tiempo real -como lo están viendo en su clase de métodos analíticos-.
Dada esta característica es muy importante diferenciar entre hacer analítica de laboratorio y analítica para producción, en la primera estamos enfocados en análisis exploratorio tanto de datos como de modelos, en la segunda nos concentramos en analítica operativa haciendo que lo que desarrolamos en la analítica de laboratorio sea empaquetada de forma en que pueda informar decisiones en el mundo real. La analítica de laboratorio sucede en lenguajes como R, Octave e incluso Python -MATLAB!!!-, mientras que la analítica para producción los algoritmos o pipelines se reescriben en Python (con patrones de ingeniería de software), Java o C++ para cumplir con los tiempos de respuesta.
Claramente sería mejor que el lenguaje ocupado en el análisis de laboratorio sea el mismo que para el análisis para producción optimizando tiempos y recursos para llegar más rápido a producción y modificar la realidad… sin embargo R es lento y carece de falta de integración a los lenguajes e infraestructura que normalmente se ocupa en producción, y C++ o Java son herramientas HORRIBLES para hacer análisis exploratorio sobre todo por que no son herramientas REPL -Read Evaluate Print Loop-
Spark fue creado en el laboratorio AMPLab de la Universidad de Berkeley (2014), también forma parte del Apache Software Foundation y es considerado como la primer solución open source de procesamiento distribuido hecha para Data Scientists.
Es considerada como la evolución de MapReduce de Hadoop. MapReduce revolucionó el procesamiento de datos de gran escala al permitir procesamiento en paralelo y que al aumentar el tamaño de los data sets el aumento en recursos computacionales es casi lineal! … al aumentar el tamaño del data set podemos escalar horizontalmente para que los trabajos se completen en el mismo tiempo, es resilente a las fallas en hardware -replicación de datos-.
Spark también mantiene la escalabilidad lineal y la tolerancia a fallos, pero extiende MapReduce en 3 formas:
¿Un árbol es un DAG?
Spark complementa las mejoras que brinda la estructura de DAG con un conjunto de transformaciones que permiten al usuario expresar los procesamientos de manera más natural por lo que se pueden expresar pipelines complejos en pocas líneas de código
Spark permite tener procesamiento en memoria a través de las abstraciones Dataset y DataFrame con las que es posible que podamos materializar cualquier punto de procesamiento de un pipeline en memoria por lo que si hay steps más adelante en el pipeline que ocupen estos datos no requieren de ser reprocesados o vueltos a cargar de disco!!!. Esta característica permite que Spark sea el framework seleccionado en algoritmos iterativos que requieren de pasar varias veces sobre un set de datos.
La razón más importante para seleccionar Spark por sobre otros frameworks es que resuelve varios de los retos de ciencia de datos mencionados anteriormente:
Spark está integrado a muchas de las herramientas del ecosistema de Hadoop:
… Por cierto, Spark está desarrollado en Scala, por lo que la gente que desarrollo Spark sugiere fuertemente utilizar Scala…pero, puedes ocupar PySpark -un wraper de Python para Spark- (también puedes conectar R con Spark con la libería sparklyr)
Fuente: Spark overview
\(\rightarrow\) Nosotros veremos todos :)
Fuente: Overview Spark
Características:
Hacer un programa en Spark consiste a grandes rasgos de los siguientes 3 pasos:
Es el REPL de Spark por default para Scala, aunque tambien existe un shell para Python :) -pyspark-. Esta consola nos permite definir funciones y manipular datos, es como R sin IDE.
:help para ver todos los comandos disponibles en el shell:history permite buscar nombres de variables o funciones que han sido generadas anteriormente en la sesión:paste permite insertar en la consola algo copiado del clipboard (cualquier cosa fuerta de Spark)El objeto más importante dentro de Spark es Resilent Distributed Dataset (RDD), una abstracción que representa una colección de objetos que puede ser distribuido en varios nodos de un clúster. Un RDD representa una forma de describir los procesamientos que queremos realizar en nuestros datos como una secuencia de pasos independientes y pequeños. Hay dos maneras de crear un RDD:
\(\rightarrow\) Todo en Spark son acciones o transformaciones y solo las acciones hacen que el procesamiento distribuido se lleve a cabo -antes no!, Spark es lazy!-. Las transformaciones son las operaciones que realizamos a los datos para “modificarlos”: filtros, agregaciones, intersecciones, uniones, joins, etc. Transformations.
Una acción hace que todas las transformaciones definidas antes de la acción se ejecuten en el clúster*, algunos ejemplos de acciones son: count, collect, first, take, saveAs…, foreach, etc. Actions
En Spark un DataFrame es una abstracción construida arriba de un RDD no son semejantes a los dataframes de Python Pandas o a R principalmente porque un DataFrame en Spark representa data sets distribuidos en un clúster, no datos locales donde cada renglón está en la misma máquina -pequeña sutil diferencia-. Para trabajar con los DataFrames se ocupa el SparkSession, para trabajar directamente con los RDD se ocupa el SparkContext.
SparkSession permite tener acceso a las funciones de SQL de Spark y a trabajar directamente con el DataFrame, SparkContext permite trabajar directamente con el RDD y hacer paralelización explícita. Desde Spark 2.x trabajamos directamente con el SparkSession, aunque sigue siendo posible trabajar con el SparkContext y el RDD. Un SparkSession siempre contiene al menos un SparkContext.
Debido a que SparkSession es un wrapper al SparkContext, la única forma de acceder al SparkContext es a través del SparkSession
\(\rightarrow\) Ver en pyspark
Decíamos que las acciones generadas sobre los RDD son las que hacen que se procesen los datos en el clúster, sin embargo existen algunas acciones que hacen que las cosas se pasen del clúster al driver (recordándo la arquitectura de Spark, eso es 1 solo nodo!).
collect: Trae al driver “todo” el data set, así como en dplyr es peligroso hacer un collect() sin conocer la cantidad de datos que traeremos, también en Spark puede ser peligrosofirst: Permite traer al driver la primer observación del Dataframe que encuentre, se ocupa como un glimpse de R pero solo de 1 observación (mucho menos costosa que un collect)take: Permite traer \(n\) observaciones del dataframe al drivershow: Muestra las primeras 20 observaciones en el DataframeTambién existe una operación que es importante entender, Shuffle. Este tipo de operaciones hace que los datos se tengan que redistribuir en el clúster lo que implica copiar datos a través de los executors y máquinas. Esta operación es muy costosa y compleja porque involucra I/O a disco, serialización de datos, y I/O de red; por lo que deberemos evitar en la medida de lo posible ocupar operaciones que sabemos que generan shuffle en los datos, o por lo menos conocer cuáles operaciones generarán shuffle para tomarlas en cuenta en el performance de la aplicación.
Operaciones que generan shuffle: repartition, coalesce, todas las operaciones que tienen un ByKey en su nombre, cogroup y join
El shuffle de Spark es como el shuffle de MapReduce, se requiere de tareas map que organizarán los datos y tareas reduce que agregarán los datos, pero la forma de ejecutarlos es diferente que la de MapReduce. Internamente los resultados individuales de cada task map son almacenados en memoria hasta que ya no quepan, entonces son ordenados basados en la partición target y escritos en un solo archivo. En el lado del reduce los tasks leen los bloques que les corresponden.
Algunas operaciones son más costosas que otras porque requieren estructuras de datos más pesados en memoria que otros -reduceByKey y aggregateByKey-
El objetivo del shuffle consiste en mover datos con la misma llave a un solo executor para que ese executor realice una operación específica sobre él.
Fuente: Spark shuffle
Para ocupar Spark en AWS solo tenemos que cambiar la selección de qué tipo de aplicación crear a Spark en lugar de la CoreHadoop que antes ocupábamos, todo lo demás se mantiene igual.
Programación funcional: Paradigma de programación basada en el cálculo lambda y en el uso de funciones matemáticas donde se evita guardar el estado y datos que cambien. Por el contrario, la programación imperativa enfatiza los cambios de estado mediante la mutación de variables.
Scala (2003) se considera la evolución de Java, porque fue diseñado pensando específicamente en ser un mejor lenguaje tomando como base Java; sigue siendo un lenguaje orientado a objetos pero incluyendo las características de programación funcional, corre sobre la JVM.
Lengujes de programación funcional:
spark.read.csv("path/to/file/in/dfs")
spark.read.parquet("/path/to/file/in/dfs")
bin/pyspark --packages com.databricks:spark-avro_2.11:4.0.0spark.read.format("com.databricks.spark.avro").load("/path/to/file/in/dfs/")
spark.read.json("/path/to/file/in/dfs")
bin/pyspark --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
CSV: df.write.csv("/path/in/dfs")
Parquet: df.write.parquet("/path/in/dfs")
Avro: df.write.format("com.databricks.spark.avro").save("/path/in/dfs")
JSON: df.write.json("/path/in/dfs")
count(): Obtiene el número de observaciones que tiene el DataFramecolumns: Regresa todas las columnas de un Dataframedescribe(): Obtiene la descripción del Dataframe o de una columnaselect(): Permite seleccionar uno o más “columnas” en el Dataframedistinct(): Regresa un nuevo Dataframe que contiene observaciones únicas en el Dataframe al que se le aplica el distinctdrop(): Regresa un nuevo Dataframe que contiene las columnas que no fueron dropeadasdropDuplicates(): Regresa un nuevo Dataframe con los renglones repetidos eliminados (una sola observación se queda)dropna: Regresa un nuevo Dataframe eliminando observaciones que tengan valores nulosfillna: Reemplaza los valores nulos con valores que tu defines por columnafilter: Como el filter de dplyr, permite filtrar las observaciones de un Dataframe que cumplen cierta(s) condición(s) lógicaforeach(f): Aplica la función \(f\) a cada observación en el Dataframe, es como el sapply de dplyrgroupBy: Agrupa los elementos de un Dataframe bajo cierto criterio para que después se puedan hacer operaciones de agregación sobre estos gruposintersect: Regresa un nuevo Dataframe con las observaciones que coinciden en ambos Dataframes df.intersect(df2)join: Hace un join entre dos Dataframes sobre alguna(s) columna(s). Hay innerr, outerr, full, left y rightlimit: Limita el resultado que se regresa al driverorderBy: Regresa un nuevo Dataframe ordenando las observaciones por el criterio especificadordd: Regresa el Dataframe como un RDDsort: Regresa un nuevo Dataframe ordenado por algún criterio(s)sample: Escoge una fracción del subconjunto del Dataframe original, es posible ocupar una semilla (seed)summary: Solo desde la versión 2.3. -en AWS no hay esa versión aún-subtract: Regresa un nuevo Dataframe que contiene las observaciones de este Dataframe que no están en el otro Dataframestat: Se crea un objeto de tipo DataFrameStatFunctions a través del cual se pueden solicitar funciones estadísticas básicas: ApproxQuantile, corr, cov, avg, también se pueden ocupar window functions!!! :), etc.toDF:toJSON: Convierte el Dataframe de Spark a un RDD donde cada observación es un JSONtoPandas: Convierte el Dataframe de Spark en uno de Pandas \(\rightarrow\) ¿esto lo trae al drive?union: Regresa un nuevo Dataframe que contiene la unión de 2 Dataframeswhere: Es un alias de filterA entregar de manera individual el 20 marzo 2018 máximo 23:59:59 CST en tu carpeta alumnos/nombre_apellido/tarea_6/
Las mismas preguntas que hiciste para la tarea 5 hacerlas con Spark :)
Ejercicio 1. Con la base de datos de northwind que se encuentran en el dropbox:
reportsto, ocupa explode en tu respuesta)Ejercicio 2. Con los archivos de vuelos, aeropuertos y aerolíneas que están en el dropbox
Tu clúster se debe llamar nombre_apellido (o puedes ocupar una imagen)
¿Qué se entrega?
¿Qué se califica?
Total: 10
Debido a que estaremos ocupando un lenguaje de propósito general, las UDF en Spark realmente se ocupan más cuando quieres ocupar una función anónima sin tener que guardarla como una función normal de Python.
\(\rightarrow\) correr en consola
numbers = [1,2,3,4,5,6]
[n * 2 for n in numbers if n % 2 == 1]
El siguiente snippet de Python utiliza la función de Python (3.5.2) sorted que requiere 2 parámetros como entrada: la “cosa” a ordenar, y la “llave” con la cuál se va a ordenar la “cosa”. La función lambda en este snippet está buscando en cada elemento del diccionario name_candidates_col el valor asociado a la llave distance.
name_candidates_col.append({'name':name[1], 'col':col, 'row':row, 'distance':distance})
...
sorted(name_candidates_col, key=lambda x: x['distance'], reverse=False)
Ahora si… un ejemplo de un UDF en Spark sería algo así:
import pyspark.sql.functions as f
#definicion de UDF
vowels = f.udf(lambda x: len(re.findall('(?i)([aeiou])', x))/len(x) if (x is not None and len(x) > 0) else 0)
...
#uso de esa UDF
def custom_regex(df, col_name):
...
f.avg(vowels(df[col_name])).alias(col_name + "_avg_vowels"),
...
En este snippet estamos definiciendo una UDF llamada vowels que por dentro ocupa una función lambda que extrae las vocales de una observación, saca su longitud, la divide entre la longitud total de la observación -si y solo si la observacion tiene un valor! (longitud > 0 y not None), de otra manera regresa 0 como número de vocales-. Más adelante ocupamos esta función en la definición de otra función (esta vez sin UDF).
Zeppelin (0.7.3) es un notebook multipropósito (piénsalo como un jupyter) que permite ocupar varios framewokrs y lenguajes en el notebook para realizar análisis de datos en un mismo ambiente, su motor por detrás es Spark, por lo que tiene todas las integraciones a los diferentes elementos del ecosistema de Spark.
A través de Zeppelin es posible ralizar ingesta de datos (de HDFS y S3), análisis de datos, visualización de datos y colaboración.
Para ocupar los diferentes intérpretes dentro de Zeppelin basta con agregar antes de cualquier cosa el nombre del engine que tiene que ocupar Zeppeling para “interpretar”. Por ejemplo, para ocupar el intérprete de python hay que poner %python también se puede poner un sabor específico de python como %python.conda.
Zeppelin tiene integrado matplotlib, panda, md, shell
spark.ml es el módulo de machine learning de Spark, diseñado para realizar machine learning dentro de spark de manera escalable y “sencilla”.
Características
¿Por qué hay un
spark.mllib y un spark.ml?
En la primer versión de Spark no existía la abstracción de DataFrame -el wrapper de los RDD- y todos los algoritmos de ML desarrollados en Spark interactuaban directamente con el RDD, todas estas implementaciones se encuentran en el paquete spark.mllib -que ya está descontinuada-. Una vez que salió la versión 2 de Spark y con ella los nuevos objetos SparkSession y DataFrame los algoritmos de ML fueron modificados -algunos- para que solo tengan interacción con la abstracción DataFrame y con ello surgió la librería spark.ml que es la que utilizaremos nosotros. Aún no están todos los algoritmos de spark.mllib implementados en DataFrame, la paridad en desarrollo está esperado alcanzarse en la versión 2.2 (esa ya existe) y la implicación más grande de no ocupar la librería spark.mllib es que a partir de Spark 3.0 la librería será removida completamente de Spark para dejar lugar a solo la interacción con los DataFrames.
Anyway Para confundir más a la banda, el nombre oficial de la herramienta que ocupa Spark para ML se conoce como MLlib (╯°□°)╯︵ ┻━┻ aunque realmente se refieren a la librería spark.ml.
El diseño de los pipelines de Spark está inspirado en los pipelines de scikit-learn -los vimos en la clase de Into to DS-. Un pipeline en Spark está formado por los siguientes elementos:
DataFrame: API que ocupa los DataFrame de SparkSQL para poder agregar otros tipos de datos que pueden ser útiles para ML -vector-
Transformer: Algoritmo que transforma un DataFrame en otro DataFrame, recuerda que los DataFrame en Spark envuelven a un RDD y un RDD no puede ser modificado!. Para hacer una transformación se ocupa el método transform(). Los casos en los que ocuparemos un transform pueden ser agregar una nueva columna -por ejemplo feature engineering-, o por ejemplo una vez que se ha pasado un modelo de aprendizaje poner la respuesta final del modelo como parte del DataFrame original -etiqueta, score-.
Estimator: Algoritmos que se aplican a un DataFrame para producir un Transformer. Los estimators son los que ocupan el método fit() para poder realizar un entrenamiento, el método fit recibe como parámetro un DataFrame y devuelve un modelo -que es un transformer-. Por ejemplo: Un algoritmo de regresión lineal es un estimator que tiene su método fit a través del cual entrena el algoritmo.
\[\text{fit} \rightarrow \text{transformer}\]
\(\rightarrow\) Es importante conocer que por cada instancia de un transformer o estimator se genera un ID a través del cuál es reconocido durante todo el pipeline
transform y si el paso es un estimator se le aplica el método fit.
Por ejemplo: Si tuviéramos un texto al cuál quisieramos aplicarle un análisis de sentimiento, el pipeline podría consistir en los siguientes pasos:
* Fuente: Spark ML Guide
En Spark, un pipeline es un estimator por lo que puede hacer llamada al método
fit, al hacer esto se genera un PipelineModel -que es un transformer-. Cuando querramos ocupar modelos entrenados para producción deberemos ocupar el PipelineModel generado en el momento de entranamiento al hacer una llamada a su método transform, de esta manera todos los estimators del pipeline original son convertidos a transformer asegurándonos de que en pruebas tendremos los mismos pasos/trasnformaciones ocupados para el entrenamiento del modelo. ╭(◔ ◡ ◔)/
*Fuente: Spark ML Guide
Un Pipeline en Spark está representado como un DAG, el ejemplo anterior es un DAG lineal, pero no necesariamente deben ser lineales, basta con que cumplan las características de ser un DAG -grafo acíclico dirigido-. Es por esta razón que cada instanciación de un transformer o estimator tiene asociado un ID y debe ser único, si necesitaramos un mismo transformer en el pipeline requerimos de generar otro transformer -aunque tenga el mismo código- :( (ya sé! esto medio que le da en la ma al principio de reuso pero … por el momento así se resuelve en Spark en pro de tener un pipeline), Spark revisa en tiempo de ejecución revisa que no se rompa “algo” antes de correr el pipeline -lazy-
Param que es un parámetro nombrado con documentación auto contenida en un ParamMap -diccionario de parámetro, valor-.En Spark hay dos maneras de pasar parámetros a los algoritmos de ML:
ParamMap con los parámetros y sus valores a través de fit o transform, si se envían parámetros de esta manera se hace override a los específicados vía settersLo lindo de estos objetos es que cada definición dentro del ParamMap es “atado” a un estimator o transformer en específico -a través el ID antes mencionado-. Por ejemplo: si tuvieramos en un pipeline dos regresiones logísticas -lr1 y lr2- podríamos ocupar un ParamMap que establezca es valor de las iteraciones máximas de cada regresión:
#pyspark
ParamMap{lr1.maxIter: 10, lr2.maxIter: 20} ¿Para qué tomarse la molestia de hacer estructuras y procesos para que Spark pueda manejar pipelines de ML?
╭(◔ ◡ ◔)/
Desde Spark 1.6 se agregó la posibilidad de guardar modelos y pipelines implementados en Spark para poder ocuparlos después, pero no todos los algoritmos de spark.ml tienen esta posibilidad -tiene que ver con la paridad que mencionamos al principio de la libería spark.mllib y la spark.ml-, por lo que se requiere revisar la documentación específica de cada algoritmo y ver si se puede y cómo. ¯\(ツ)/¯
El paquete spark.ml.util tiene los objetos MLWriter y MLReader que permiten guardar y cargar modelos sin importar el lenguaje en el que se hayan implementado -Scala, Java, Python o R (implementaciones especificamente para Spark!)-
Pipeline: Spark ML API
save(path) para poder guardar un pipeline en el path indicado, es un shortcut a write.save(path)load(path) para poder cargar un pipeline, es un shortcut a read.load(path)Modelos:
Los modelos que tengan posibilidad de ser guardados tendrán los métodos de save(path) y load(path) (verificar documentación del API para el modelo ocupado)
#pyspark
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
# En Spark se puede crear un DataFrame de un RDD, de una lista o de un DataFrame de Pandas,
# aquí lo estamos creando con una lista que contiene tuplas de (label, features)
# tal cual lo hacíamos en sklearn, y le estamos agregando los nombres de cada columna.
# Vectors.dense recibe una lista como parámetro
# Este DataFrame lo estamos ocupando como nuestro set de entrenamiento mock!
training = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
# Como lo hacíamos en sklearn, primero instanciamos el modelo que quremos
# ocupar con los parámetros que nosotros queremos tener para este
# modelo en particular --configuramos el modelo--
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Veamos la documentacion del modelo y que parametros le pusimos a nuestra
# configuracion
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
# Ocupemos el modelo que configuramos para entrenar con lo datos que
# creamos en el DataFrame training
model_1 = lr.fit(training)
# model_1 es un transfomer creado a traves de un estimador (LogisticRegression)
print("Model 1 was fit using parameters: ")
# aqui estamos obteniendo la configuracion con la que se entreno
# la regresion logistica que ocupamos
print(lr.extractParamMap())
# Tambien podemos especificar los parametros con los que queremos que
# corra el modelo utilizando el diccionario de ParamMap
# Creamos un diccionario -se puede llamar como quieras!- que tenga
# como llave el nombre del parametro que quieres modificar, con el valor
# correspondiente.
param_map = {lr.maxIter: 20}
# Si el valor ya existe en el diccionario puedes actualizarlo
param_map[lr.maxIter] = 30
# Tambien puedes actualizar varios parametros del diccionario al mismo tiempo
param_map.update({lr.regParam: 0.1, lr.threshold: 0.55})
# Se pueden combinar diferentes diccionarios...realmente puedes tener
# un solo diccionario con los parametros de diversos modelos que ocupes en el
# pipeline sin ningun problema, pues el valor asociado es por objeto (ID)
# aqui estamos cambiando el nombre de la columna que guarda la salida del
# modelo, por default se llama 'probability' -> verificar documentacion del
# metodo
param_map_2 = {lr.probabilityCol: "my_probability"}
param_map_combined = param_map.copy()
param_map_combined.update(param_map_2)
#puedes ver el contenido del diccionario con param_map_combined.items() -> python 3.5.2
# Entrenemos una segunda regresion logistica con los nuevos parametros que
# establecimos a traves del paramMap
# En este fit estamos enviando tanto los datos como los parametros a ocupar en el
# modelo de regresion logistica
model_2 = lr.fit(training, param_map_combined)
print("Model 2 was fit using parameters: ")
# aqui queremos ver cono que parametros se quedo configurado el modelo
# que ocupamos para entrenar
print(lr.extractParamMap())
# Creemos el data frame que tendra los datos de prueba mock!
test = spark.createDataFrame([
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
# Ahora si, hagamos predicciones sobre el set de pruebas ocupando el cerebro
# antes entrenado utilizando el transform
prediction = model_2.transform(test)
# la respuesta es un DataFrame (la salida de un transform en su DataFrame)
# por lo que podemos aplicarle los metodos de SparkSQL :)
# verificamos que si es un DataFrame...
type(prediction)
# veamos que columnas tiene este DataFrame (como el names de R)
prediction.columns
# Aqui estamos seleccionando las columnas features, label,
# my_probability -> que es el nombre que nosotros especificamos anteriormente en
# ParamMap, y la columna prediction que es el nombre por default que regresa
# el modelo al parametro 'predictionCol' -> ver documentacion
# el collect hara que se regresen los resultados al drive!!!
result = prediction.select("features", "label", "my_probability", "prediction") \
.collect()
for row in result:
print("features={}, label={} -> prob={}, prediction={}".format( \
row.features, row.label, row.my_probability, row.prediction))
Pasaron demasiadas chivas como para quedarse quietesitos… ¡Hagámoslo! = Hazlo :P
¿Qué demonios pasó aquí?
#pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
# al igual que en el ejemplo anterior, creamos un dataframe a traves
# de una lista con los datos de entrenamiento, la lista esta formada
# por tuplas (id, texto, label). Esta forma no nos servirá para poder meterla
# en los objetos de ML, pero mas adelante arreglaremos esto
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# Definimos los transformers: Tokenizer y HashingTF, y los
# etimators: LogisticRegression que ocuparemos. Nota que aqui no hemos hecho
# ningun fit todavia... la magia vendra mas adelante ;)
# Tokenizer convierte el string de entrada (inputCol) a minusculas y separa en
# palabras utilizando ocmo separador el espacio
tokenizer = Tokenizer(inputCol="text", outputCol="words")
# HashingTF permite hashear cada palabra utilizando MurmurHash3 convirtiendo
# el hash generado en el indice a poner en el "TDM". Este metodo optimiza el
# tiempo para generar el TDM de TF-IDF "normal". Para evitar colisiones en
# la conversion a hash se aumenta el numero de bucket -se recomienda ocupar
# potencias de 2 para balancear las cubetas-
# Nota que en este transformet estamos ocupando como entrada la salida del
# transformer Tokenizer
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
# Ocuparemos una regresion logistica de nuex
lr = LogisticRegression(maxIter=10, regParam=0.001)
# Aqui viene lo bonito... definimos un pipeline que tiene como etapas/pasos
# primero el tokenizer, luego el hasing y luego la regresion logistica. Aqui
# estamos definiendo el flujo de procesamiento, el DAG!
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Voila, solo se requiere de hacer fit al pipeline para que esto funcione
# como un pipeline, siguiendo el orden de los pasos establecidos en la
# definicion del pipeline :) ... recuerda que el fit solo es como
# el entrenamiento una vez que ya definimos las configuraciones de
# los objetos que ocuparemos (transformers y estimators)
model = pipeline.fit(training)
# Creamos el dataframe de pruebas mock! -> Nota que aqui no hay
# label!!!! (asi funcionaria en produccion cierto!)
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])
# Lixto, "ejecutamos" el pipeline haciendo un transform al pipeline para
# obtener las predicciones del set de pruebas
prediction = model.transform(test)
# De nuevo, prediction es un DataFrame generado con un transformer generado
# a traves de estimadores y transformers :)
# Seleccionamos las columnas que queremos ver
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print("({}, {}) --> prob={}, prediction={}".format( \
rid, text, str(prob), prediction))
En este módulo de la librería de ML (pyspark.ml.feature) se encuentran las funciones asociadas a las siguientes acciones:
En el ejemplo del texto ocupamos métodos de feature transformantio: Tokenizer, y de LSH: HashingTF. En esta parte veremos ejemplos de los métodos más utilizados en cada una de las categorías mencionadas, esta parte para nada es exhaustiva pues Spark cuenta con muchos métodos implementados, solo es para que se den una idea de cómo se ocupan en Spark. (Spark ML feature API)
*Fuente: Spark ML Guide
Solo para recordar, TF-IDF es un algoritmo de minería de texto ocupado normalmente en problemas de IR a través del cual contando la frecuencia de aparición de una palabra en todo la colección de documentos y en la frecuencia dentro de cada documento se establece la relevancia de un documento dado un query de búsqueda.
\[tf\_idf=tf \cdot log_{10}\frac{N}{df}\]
En este ejemplo, TF-IDF se ocupa como una transformación a una variable raw -las palabras- para se ocupadas como un feature en otro algoritmo de aprendizaje de máquina.
#pyspark
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
# Creamos nuestro set de entrada para formar la TDM
sentence_data = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])
# Ocupamos el transformer Tokenizer para separar por palabras
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
# Aqui no hay train! porque no estamos entrenando nanda... estamos en un problema
# de IR. Tokenizer no tiene un metodo fit -no hay entrenamiento-
words_data = tokenizer.transform(sentence_data)
# Ocupamos el transformer CountVectorizer para generar una matriz de
# terminos y sus frecuencias
count_vectorizer = CountVectorizer(inputCol="words", outputCol="raw_features")
featurized_model = count_vectorizer.fit(words_data)
featurized_data = featurized_model.transform(words_data)
featurized_data.show(truncate=False)
# Ocupamos IDF para obtener el IDF de la coleccion de documentos mock que
# generamos. IDF si tiene un metodo fit a traves del cual le enviamos el set
# de tokens al que queremos obtener el IDF
idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=1)
# Aqui obtenemos el modelo a ocupar (transformer) a ocupar
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)
rescaled_data.select("label", "features").show(truncate=False)
Solo para recordar, one hot encoding transforma una variable categórica de \(n\) categorías a \(n\) variables binarias, normalmente ocupamos esta transformación para ocupar variables categóricas en algoritmos que solo ocupan representaciones numéricas -normalmente aquellos algoritmos que ocupand distancias-
#pyspark
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# creamos nuestro set de datos de entrada categorico
df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
# Esta funcion agrega un id numerico a cada valor diferente de un valor categorico
# es como establecer los niveles en R de una factor pero los niveles son numericos,
# sus id. El indice se establece por orden de frecuencia (descendente), por lo que
# el indice 0 corresponde a la variable que aparece con mas frecuencia
string_indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = string_indexer.fit(df)
indexed = model.transform(df)
indexed.show()
# OneHotEncoder no tiene un fit ya que solo es un transformador
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()
Es la normalización de minería de datos \(\frac{x-min}{max-min}\)
#pyspark
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
data_frame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -1.0]),),
(1, Vectors.dense([2.0, 1.1, 1.0]),),
(2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
data_frame.show()
# Configuramos el estimator MinMaxScaler como lo necesitamos
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
# Creamos el modelo MinMaxScaler (transformer)
scaler_model = scaler.fit(data_frame)
# Transformamos los datos reescalando
scaled_data = scaler_model.transform(data_frame)
# Nota que cuando pedimos getMin y getMax lo hacemos al estimator, no al modelo
print("Features scaled to range: [{}, {}]".format(scaler.getMin(), scaler.getMax()))
scaled_data.select("features", "scaled_features").show(truncate=False)
Corresponde a la estandarización en minería de datos \(\frac{x-\mu}{\sigma}\)
#pyspark
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler
# Creamos el data frame que queremos estandarizar
data_frame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -1.0]),),
(1, Vectors.dense([2.0, 1.1, 1.0]),),
(2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
# Configuramos el estimator StandarScaler como lo necesitamos (por default
# withMean esta en False porque hace que se regrese un vector dense...
# hay que tener cuidado con eso cuando estemos manejandoo vectores sparse
scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
withStd=True, withMean=True)
# Creamos el modelo StandardScaler para los datos de entrada
scaler_model = scaler.fit(data_frame)
# Transformamos los datos
scaled_data = scaler_model.transform(data_frame)
scaled_data.show(truncate=False)
s
\(\rightarrow\) Además de los métodos que contiene este submódulo, los algoritmos más ocupados para la selección de variables se encuentran dentro del de Feature transformation: PCA, o bien son algoritmos dentro de spark.ml: DecisionTrees, RandomForests.
Este método se ocupa para variables categóricas, se realiza una prueba de independencia \(\chi^2\) (\(\chi^2\) test) para seleccionar los mejores features. Hay 3 tipos de selección que se pueden hacer:
numTopFeatures selecciona los top n features de acuerdo a la \(\chi^2\) test, es como seleccionar los features que mayor información predictiva tienenpercentile similar a numTopFeatures pero seleccionando un porcentaje en lugar de un numero fijofpr selecciona aquellos features cuyo p-value sea menor a un cierto threshold controlando los falsos positivosfrom pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
# creamos nuestro set de datos con features
df = spark.createDataFrame([
(7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
(8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
(9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])
# Configuramos el estimator
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
outputCol="selectedFeatures", labelCol="clicked")
# Creamos el modelo de ChiSquare y luego ocupamos el transform para
# seleccionar los features con mas informacion
result = selector.fit(df).transform(df)
print("ChiSqSelector output with top {} features selected".format( \
selector.getNumTopFeatures()))
result.show()
En el siguiente ejemplo aplicaremos PCA a un data set de 5 variables -dimensiones- para dejarlo en uno de 3, tomando las primeras 3 componentes principales.
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
# Creamos el data frame de 5 dimensiones, nota que en este data frame estamos
# ocupando un vector de tipo sparse en donde solo ponemos la dimension del vector
# los indices donde los valores son diferentes de 0 y los valores de esos indices
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
# Configuramos el modelo
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
# Creamos el estimator PCA
model = pca.fit(df)
# Transformamos a PCA los valores quedandonos con los primeros 3 componentes
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
Cargaremos un dataset de prueba que se encuentra en formato libsvm, en este formato cada línea tiene un vector sparse con etiquetas. En un vector sparse se guarda la etiqueta seguida de los índices que tienen un valor diferente de 0 y el valor asociado label index1:value1 index2:value2 .... Por ejemplo:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Cargamos los datos de prueba
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")
# Indexamos las categorias que existen de la columna label
# y transformamos el estimator a modelo (transformer)
label_indexer = StringIndexer(inputCol="label", outputCol="indexed_label").fit(data)
# El VectorIndexer es un estimator que identifica que variables son categoricas
# definiendo como categoricas aquellas que tienen menos de 5 valores diferentes
# si la variable tiene mas de 5 valores diferentes se tomara como variable continua
feature_indexer =\
VectorIndexer(inputCol="features", outputCol="indexed_features", maxCategories=4).fit(data)
# separamos los datos en entrenamiento y pruebas (70 y 30)
(training_data, test_data) = data.randomSplit([0.7, 0.3])
# configuramos el modelo de RandomForest con los parametros que queremos
# normalmente el numero de arboles, numero de elementos minimo que un nodo puede
# tener para ser dividio (nodos hoja), algoritmo de impureza a ocupar: gini o entropia
# a diferencia de R y scikitlearn, aqui no se puede especificar el numero de
# variables a tomar en cuenta por cada arbol, spark toma el default (sqrt(n))
rf = RandomForestClassifier(labelCol="indexed_label", featuresCol="indexed_features", numTrees=10)
# Convertimos las etiquetas indexadas a sus valores originales
label_converter = IndexToString(inputCol="prediction", outputCol="predicted_label",
labels=label_indexer.labels)
# Hacemos el pieline!! :) definiendo los pasos y su orden
pipeline = Pipeline(stages=[label_indexer, feature_indexer, rf, label_converter])
# Entrenamos :) con los datos de prueba
model = pipeline.fit(training_data)
# Modificamos el estimator a transformer para hacer predicciones con los datos de prueba
# ocupando el cerebro entrenado en fit
predictions = model.transform(test_data)
# Seleccionamos las columnas de nuestro interes (los 5 primeros registros)
predictions.select("predicted_label", "label", "features").show(5)
# Ahora evaluaremos el desempenio del modelo, para ello ocupamos el
# modelo MulticlassClassificationEvaluator que se encuentra en el modulo
# pysaprk.ml.evaluation. Esta funcion permite evaluar el desempenio de un
# modelo de ML, le pasamos la columna que tiene las etiquetas (originales)
# y la etiqueta predicha por el modelo en metricName le especificamos la medida
# de desepenio en la que queremos poner atencion -> recuerda el pequenio
# detalle de solo mirar accuracy y no precision, recall (o f1 score en el
# mejor de los casos). Por el momento solo es posible obtener precision, recall y
# accuracy
evaluator = MulticlassClassificationEvaluator(
labelCol="indexed_label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = {:f}".format(1.0 - accuracy))
rfModel = model.stages
print(rfModel) # summary only
#pyspark
from pyspark.ml.clustering import KMeans
# Creamos el data frame con los datos que ocuparemos mock
dataset = spark.createDataFrame([
(0, Vectors.dense([0.0, 0.0, 0])),
(1, Vectors.dense([0.1, 0.1, 0.1])),
(2, Vectors.dense([0.2, 0.2, 0.2])),
(3, Vectors.dense([9, 9, 9])),
(4, Vectors.dense([9.1, 9.1, 9.1])),
(5, Vectors.dense([9.2, 9.2, 9.2]))], ["label", "features"])
# Configuramos el KMeans con 2 grupos
kmeans = KMeans().setK(2).setSeed(1)
# Creamos el estimator Kmeans
model = kmeans.fit(dataset)
# Vemos que tan bien o mal nos fue, obteniendo el SSE de los puntos a
# su centro
wssse = model.computeCost(dataset)
print("Within Set Sum of Squared Errors = " + str(wssse))
# Obtenemos centros
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
En Spark es posible hacer el tuneo de hiperparámetros de un modelo específico -por ejemplo RandomForestClassifier- o si este modelo forma parte de un pipeline se puede hacer tuneo del pipeline completo :)
Hyperparameter tuning
El semestre pasado hablábamos de un grid de parámetros de búsqueda que queríamos que se probaran de manera automática, en Spark este grid se puede especificar en la función ParamGridBuilder.
Para hacer selección de modelos en Spark se requiere de los siguientes elementos:
RegressionEvaluator, BinaryClassificationEvaluator o MulticlassClassificationEvaluator-El proceso general de selección de modelos es el siguiente:
fit) con los parámetros, obtener el modelo entrenado, evaluar el desempeño del modelo utilizando el EvaluatorEn este ejemplo ocuparemos un ParamGrid para poder hace evaluación de varios modelos, ocuparemos el ejemplo de texto que habíamos ocupado antes para ejemplificar el uso de pipelines en Spark. Probaremos con 3 valores diferentes para la parte de HashingTF -3 diferentes números de cubetas-, 2 valores diferentes para la regresión logística y ocuparemos un \(k\) de 2 para el cross validation. Estos cambios nos llevarán a tener \((3*2)*2=12\) modelos diferetnes. Hacer selección de parámetros a través del grid es normalmente muy costoso pues requerimos de explorar todas las opciones que explícitamente ponemos en el mismo, existen otros algoritmos para hacer el tuneo de hiperparámetros, puedes encontrar las ligas en la sección de referencias (Hyperparameter tuning)
#pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Creamos un DataFrame con ids, texto, label
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 0.0)
], ["id", "text", "label"])
# Configuramos los elementos que formaran parte de nuestro pipeline: Tokenizer, HashingTF,
# regresion logística y pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Definimos uno de los 3 elementos que necesitamos para hacer hpyperparameter tuning
# el grid de parametros.
# Especificamos el grid que queremos explorar agregando los parametros
# que queremos modificar de cada estimator y poniendo los valores que
# queremos explorar para cada uno de ellos en una lista.
# En este grid estamos estableciendo que queremos explorar tener 10
# cubetas, 100 y 1000 (siempre ocupar valores pares para que las cubetas
# queden balanceadas); para la regresion logistica ocupamos dos parametros de
# regularizacion
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
# Definimos el estimador que queremos evaluar -recuerda que pipeline es un estimator-
# y el evaluador que ocuparemos para determinar el desempenio de cada modelo
# por default el BinaryClassificationEvaluator toma como metrica de desempenio
# el AUC -metricName-, solo hay AUC o area under precidion recall curve
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # ocupar minimo 3 - 10 normalmente!
# Cross validation tambien es un estimador, le hacemos fit
cvModel = crossval.fit(training)
# veamos cual fue el mejor modelo
cvModel.bestModel
# Creamos nuestro dataframe de pruebas
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "mapreduce spark"),
(7, "apache hadoop")
], ["id", "text"])
# Hacemos predicciones con el mejor modelos encontrado por el CrossValidator que es el
# que quedo en cvModel
prediction = cvModel.transform(test)
# seleccionamos las columnas que queremos ver y las imprimimos para ver el resultado
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
print(row)
A entregar de manera individual máximo el 24 de abril de 2018 23:59:59 CST (-0.5 por cada día de retraso) en tu carpeta alumnos/nombre_apellido/tarea_7
Con los datos que tenemos de flights queremos predecir el tiempo de retraso de salida DEPARTURE_DELAY
gridParamMap para modificar los parámetros de los algoritmos que seleccionaste, al menos deberás de tener 3 valores diferentes en 2 de los parámetros. Por ejemplo: si seleccionaste un random forest puedes modificar el número de árboles y el número de elementos mínimo para no seguir dividiendo (con 3 valores diferentes)magic loop para probar probar tus dos diferentes algoritmosbestModel)magic loop (ocupa python directo con la librería timeit)¿Qué se entrega?
magic loop (punto anterior)¿Qué se califica?
Total: 10
El módulo de Spark que permite procesar datos en live-data stream, al ser parte de spark comparte sus cualidades de ser escalable, con un throughput alto y con tolerancia a fallos en los streams.
Spark streaming puede ingerir datos de diversas fuentes -tecnologías/frameworkds de streaming-: Apache Kafka (flujos de datos en stream), Apache Flume (especializado en archivos log), AWS Kinesis , sockets TCP. Los datos procesados por Spark Streaming se pueden enviar a sistemas de archivos distribuidos, bases de datos y/o dashboards.
*Fuente: Spark streaming guide
Lo que hace Spark Streaming es recibir un stream de datos y dividirlos en batches de datos que después puedan ser procesados por cualquier elemento del ecosistema de Spark.
*Fuente: Spark streaming guide
Hay dos elementos importantes en el módulo de Spark Streaming:
SparkContext para ser creado.StreamingContext\(\rightarrow\) para crear un StremaingContext debemos de obtener el sparkContext al que está asociado nuestro sparkSession de la siguiente manera:
#pyspark
from pyspark.streaming import StreamingContext
sc = spark.sparkContext
sc
# requerimos el sparkContext de nuestro sparkSession e indicarle
# el intervalo de tiempo para hacer cortes en el streaming y generar
# los batches de datos
ss = StreamingContext(sc, 1)
ss
Una vez que ya tenemos inicializado el StreamingContext tenemos que hacer lo siguiente:
stremaing creando DStreamsstream aplicando transformaciones y operaciones sobre los DStreamsstreamingContext.start()streamingContext.awaitTermination()Se puede detener el procesamiento manualmente utilizando streamingContext.stop()
StreamingContext activo por JVM al mismo tiempo (como el SparkSession)\(\rightarrow\) Se puede ocupar un mismo SparkContext para crear diferentes StreamingContext siempre y cuando el StreamingContext anterior haya sido detenido (sin parar el SparkContext) antes de que el siguiente sea creado
Los DStream internamente representan una serie de RDDs -Resilent Distributed Datasets, recuerda que son inmutables-. Cada RDD en un DStream tiene una parte de los datos del intervalo del stream que se consumió. Cada operación que se realiza a un DStream se aplica a los RDDs que forman parte de ese DStream, ese es el beneficio más grande de los DStream, abstraen la implementación de aplicar cada operación que definimos a los RDD que lo contienen sin que nosotros nos tengamos que preocupar por lo que pasa por abajo, solo ocupamos el API de los DStream para aplicar operaciones y transformaciones y SparkStreaming se encarga de aplicarlos a todos los RDD correspondientes.
Un Input DStream es un DStream que representa los datos que vienen de una fuente de stream, cada input stream está asociado a un receiver -excepto los input stream que representan los datos de file streams-. La función de un receiver consiste en obtener los datos de la fuente de stream y guardarlos en Spark en memoria para procesarlos.
En Spark hay 2 tipos de fuentes de stream:
StreamingContext (sistemas de archivos y conexiones de sockets)Paréntesis cultural: Un socket es una forma de comunicación entre 2 máquinas -normalmente cliente/servidor-, a través de este medio de comunicación se ocupan endpoints -direcciones ip específicas- con un puerto específico -nosotros lo podemos especificar- a través del cuál el cliente estará escuchando y enviando peticiones al server, y el server estará escuchando y enviando datos al cliente. Este tipo de comunicación ocupa el protocolo TCP (redes).
*Fuente: Oracle tutorials
*Fuente: Oracle tutorials
Es posible tener varias fuentes de streaming al mismo tiempo -en paralelo-, para ello necesitamos tener suficientes cores (o hilos si corre localmente) corriendo para tener todos los DStream de entrada, los receivers y el procesamiento \(> \text{# de recievers}\)
Sockets. Nos permite hacer streaming a través de sockets utilizando el método sockeTextStream(endpoint, port) del objeto stremaing context
Archivos (file streams). Nos permite hacer streaming de archivos alamacenados en DFS: HDFS, S3, NFS, etc. Para creara un DStream de este tipo se ocupa el método textFileStream(data_directory) del objeto streaming context. Todo archivo que viva en el directorio especificado será tomado como parte del stremaing, por ello todos los archivos bajo este directorio tienen que tener la misma estructura y formato interno (por el momento Spark no sabe manejar directorios anidados). \(\rightarrow\) Los archivos no pueden ser modificados, si se siguen agregando observaciones a los archivos -append- estos no serán procesados
Streams basados en receivers propios. Ver la guía de Custom receiver guide
Cola de RDDs como stream. Muy ocupado para probar una aplicación de Spark streaming a través de una serie de RDDs encolados, cada RDD es tomado como un batch. El método para esta funcionalidad es queueOfRDDs del objeto Streaming context
Para este tipo de fuentes se require de ocupar librerías externas a Spark que requeiren de cumplir con dependencias, pero como arreglar todas las dependencias ha resultado en serios dolores de cabeza para la mayoría de íngenieros de datos, Spark ayudó generando librerías de Spark que hacen menos engorroso el mapeo de todas las dependencias, a esto se le llama linking para mapear las librerías de Spark de fuentes avanzadas a las tecnologías/frameworks que las ocupan: Kafka, Kinesis y Flume son las que en la versión de Spark 2.1 existen como implementación en pyspark.
Es posible crear un DStream para fuentes de streaming custom, para ello tenemos que implementar un user defined receiver que pueda recibir datos de la fuente custom y empujarlos a Spark. Ver Custom receiver guide
Es posible clasificar a los receivers de acuerdo a su nivel de confiabilidad -asegurarnos que no perdemos datos cuando estamos en el stream-. Los receivers que no envían una confirmación -acknowledge- de la recepción son considerados no confiables -unreliable receiver-, mientras que los que envían una confirmación de recepción son considerados confiables -reliable receiver-. La confirmación incluye enviar un mensaje a la fuente una vez que los datos han sido recibidos y guardados en Spark con replicación.
Este ejemplo consiste en tener una fuente de streaming básica -un socket- a través del cuál enviaremos texto que será procesado después por Spark para contar el número de palabras que llegan a través del stream.
Ocuparemos el comando nc -lk 9999 en nuestro “server” para decirle que estaremos escuchando por el puerto 9999 (abrimos el socket en el puerto 9999), nc es un comando de netcat que es una utilería de Unix que se ocupa para leer y escribir datos a través de la red utilizando el protocolo TCP, el -lk es una bandera que indica que estaremos escuchando (l: listening) y que forzaremos a mantenernos escuchando aún cuando la conexión actual sea completada (-k: keep).
En nuesro cliente ejecutaremos nuestro script de python para recibir el streaming a través del endpoint localhost en el puerto 9999: ./bin/spark-submit ex_1.py localhost 9999
# word_count.py
import pprint
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def count_words():
# Definimos el numero de cores que necesitamos, como este ejemplo corre local
# ocupamos el local[n] que permite dedicar n numero de hilos al input DStream,
# al receiver y al procesamiento.
sc = SparkContext("local[2]")
# definimos el intervalo de tiempo de stream (cada segundo generaremos)
# batches de DStream
streaming_context = StreamingContext(sc, 1)
# Creamos un input DStream de tipo socket -basico- con el endpoint
# localhost = 127.0.0.1 en el puerto 9999
lines = streaming_context.socketTextStream("localhost", 9999)
# de cada DStream que obtenemos queremos dividir en palabras para poder contarlas
words = lines.flatMap(lambda line: line.split(" "))
print(type(words))
# Procesamos los DStream haciendo un conteo al estilo map/reduce
pairs = words.map(lambda word: (word, 1))
word_counts = pairs.reduceByKey(lambda x, y: x + y)
# mostramos los primeros 10 elementos de cada RDD generado en el DStream en consola
word_counts.pprint()
#### Hasta este punto realmente no hemos ejecutado nada, solo hemos
# configurado todo lo que necesitamos para recibir y procesar el stream de datos
# para iniciar el procesamiento del streaming necesitamos hacer el llamada al
# metodo start() del StreamingContext
streaming_context.start()
streaming_context.awaitTermination()
if __name__ == "__main__":
count_words()map(func): Es el lapply de Spark, aplica una función a cada elemento del DStream, genera un nuevo DStream
flatMap(func): Es como el anterior solo que cada entrada puede ser mapeada a 0 o más registros de salida
filter(func): Como el filter de dplyr, genera un nuevo DStream con las observaciones qeu cumplen cierto(s) criterio(s)
repartition(numPartitions): Cambia el nivel de paralelismo en el DStream (específicamente en el que se está poniendo el repartiton) creando más o menos particiones
union(otherStream): Se genera un nuevo DStream con la unión de los elementos de 2 DStream
count(): Regresa un nuevo DStream que contiene el conteo del número de elementos en cada RDD que forma parte del DStream al que se le aplica el count
reduce(func): Regresa un nuevo DStream que agrega elementos de cada RDD en el DStream desde donde se aplica el reduce aplicando una función de agregación
countByValue(): Regresa un nuevo DStream con los conteos de frecuencia por llave que existen en todos los RDD del DStream al que se le aplica
reduceByKey(func, [numTasks]): Recibe un DStream con par numTasks tiene por default 2 cuando spark está instalado en modo local, de otra manera toma el valor que tenga el parámetro spark.default.parallelism
join(otherStream, [numTasks]): Aplica un full join a dos DStream \(\rightarrow\) recuerda que cada DStream es un stream de datos!.
cogroup(otherStream, [numTasks]): Se aplica a un part de DStream que tienen pares
transform(func): Aplica una función de transformación a todos los RDD de un DStream (genera otro RDD), es posible ocupar una transformación de las funciones que ya están implementadas en Spark o bien implementar una especia de UDF sobre transformación a realizar a los RDD que no existen a través del API.
updateStateByKey(func): Actualiza el estado de cada llave en el DStream aplicando una función al estado previo del DStream. Esta operación nos permite mantener un estado arbitrario mientras el estado actual se actualiza constantemente. Para ocupar este tipo de operación se requiere de lo siguiente:
Ejemplo: Utilizando el ejemplo de conteo de palabras agregaremos una función de actualización de estado, esta función será llamada por cada palabra donde los nuevos valores (newValues) son los 1 asociados a cada palabra y el runningCount corresponde al conteo “anterior”.
## definimos nuestra funcion de actualizacion
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
#### aplicamos la funcion de actualizacion de estado a un DStream que trae pares (word, 1)
runningCounts = pairs.updateStateByKey(updateFunction) Para ocupar
updateStateByKey se requiere de ocupar checkpoint (lo veremos más adelante).
Las operaciones window permiten aplicar transformaciones sobre una ventana de datos. Cada vez que se mueve la ventana (en este ejemplo de 3) todos los RDDs que caen dentro de la ventana son combinados para aplicarles alguna transformación u operación, en este ejemplo las operaciones son aplicadas a las 3 últimas unidades de datos.
* Fuente Spark streaming guide
Para definir una operación de ventana se requieren de 2 parámetros:
\(\rightarrow\) La definición de ambos parámetros debe ser múltiplos del intervalo definido para el batch en el DSource
Operaciones típicas
window(windowLength, slideInterval): Genera un DStream basado en los batches que forman parte de la ventanacountByWindow(windowLength, slideInterval): Regresa el conteo de elementosque forman parte de la ventana y el intervalo de deslizamiento de un streamreduceByWindow(func, windowLength, slideInterval): Genera un nuevo elemento de stream creado por agregar elementos del stream que forman parte de una ventana e intervalo de deslizamiento a través de una función de agregación \(\rightarrow\) la función debe poderse ejecutar en paralelo por lo que tiene que ser asociativa y conmutativareduceByKeyAndWindow(func, windowLength, slieInterval, [numTasks]): Igual que el reduceByKey solo que se aplica sobre una ventana indicando la longitud de la ventana y el intervalo de deslizamientoreduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): Versión más eficiente que la anterior donde el reduce por ventana se calcula de forma incremental tomando en cuenta el reduce de la ventana anterior. Para que esto pueda suceder se requiere de tener una especie de inverse reducing sobre los datos viejos que salen de la ventana… normalmente implica poner la operación inversa a la ocupada en el reduce. Puede ser que no exista una función inversa a la ocupada en el reduce :( tonz no se puede aplicar esta función siempre.countByValueAndWindow(windowLength, slideInterval, [numTasks]): Aqui el conteo se realiza por valor en la ventana -obtenemos la frecuencia por llave-. Los Dstream que forman parte de la ventana son pares Hagamos un ejemplo para entender cómo funcionan las operaciones en ventanas, modificaremos el ejemplo visto para contar palabras de los últimos 30 segundos de datos cada 10 segundos. Para poder hacer esto tendremos que aplicar un reduceByKey sobre los pares de DStream (word, 1) que se tengan de los últimos 30 segundos, esto lo haremos a través de la operación reduceByKeyAndWindow.
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)join, rightOuterJoin, leftOuterJoin y fullOuterJoinstream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))Los DStreams creados puedes ser “empujados” a otros sistemas/frameworks externos como un DFS, una base de datos, etc. Las funciones de salida que existen para los DStreams son:
print(): \(\rightarrow\) pprint en python, imprime los primeros 10 elementos del cada batch de datos en un DStream (implica bajar a drive -el nodo donde se está corriendo la aplicación de streaming-)saveAsTextFiles(prefix, [suffix]): Permite guardar el contenido de este DStream como un archivo de textosaveAsObjectFiles(prefix, [suffix]): Guarda el contenido del DStream como una secuencia de archivos u objetos serializados en Java \(\rightarrow\) No está disponible para el API de PythonsaveAsHadoopFiles(prefix, [suffix]): Guarda el contenido del Dstream como archivos de Hadoop. \(\rightarrow\) No está disponible para el API de PythonforeachRDD(func): Es la función más genérica para guardar datos, esta función permite que nosotros implementemos nuestra lógica para guardar datos a un sistema externo aplicando esta función a cada RDD dentro del DStream. Esta función es ejecutada en el driver en donde se ejecuta la aplicación de streaming que afectan el performance de los RDD que están llegando a través del stream.Para ocupar esta funcinó de manera eficiente se recomienda:
Don´t
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)En el ejemplo anterior haremos que la conexión creada desde el driver tenga que ser serializada para ser enviada al worker esto no es la forma en la que se crean conexiones … generará un error de serialización y/o inicialización.
rdd.foreachPartition para crear una conexión por RDDDon’t
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))Do
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))Los DStream son ejecutados de forma lazy a través de las operaciones de salida, los RDD también se ejecutan lazy hasta que se hagan acciones sobre el RDD. Como los DStream contienen RDD, las acciones al RDD dentro de las operaciones de salida son las que forzan a la ejecución/procesamiento de los datos que se reciben en el stream, si nuestra aplicación no tiene operaciones de salida o tiene operaciones de salida del tipo dstream.foreachRDD() sin acciones dentro de los RDD que forman parte del stream nada será ejecutado, solo recibiremos datos y los tiraremos.
Las operaciones de salida se ejecutan una a la vez en el orden definido en la aplicacinó de streaming
Podemos ocupar DataFrame SQL y todas sus funciones con datos que vienen del stream para ello necesitamos crear un SparkSession del SparkContext que está ocupando el StreamingContext.
# word_count.py
import pprint
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
# Convertimos los RDD en el DStream a DataFrame para ejecutar alguna funcion de SQL
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
# Convert RDD[String] to RDD[Row] to DataFrame
row_rdd = rdd.map(lambda w: Row(word=w))
words_data_frame = spark.createDataFrame(row_rdd)
# Creates a temporary view using the DataFrame.
words_data_frame.createOrReplaceTempView("words")
# Do word count on table using SQL and print it
word_counts_data_frame = \
spark.sql("select word, count(*) as total from words group by word")
word_counts_data_frame.show()
except:
pass
def count_words():
# Definimos el numero de cores que necesitamos, como este ejemplo corre local
# ocupamos el local[n] que permite dedicar n numero de hilos al input DStream,
# al receiver y al procesamiento.
sc = SparkContext("local[2]")
# definimos el intervalo de tiempo de stream (cada segundo generaremos)
# batches de DStream
streaming_context = StreamingContext(sc, 1)
# Creamos un input DStream de tipo socket -basico- con el endpoint
# localhost = 127.0.0.1 en el puerto 9999
lines = streaming_context.socketTextStream("localhost", 9999)
# de cada DStream que obtenemos queremos dividir en palabras para poder contarlas
words = lines.flatMap(lambda line: line.split(" "))
print(type(words))
# Procesamos los DStream haciendo un conteo al estilo map/reduce
pairs = words.map(lambda word: (word, 1))
word_counts = pairs.reduceByKey(lambda x, y: x + y)
# mostramos los primeros 10 elementos de cada RDD generado en el DStream en consola
word_counts.pprint()
#### Hasta este punto realmente no hemos ejecutado nada, solo hemos
# configurado todo lo que necesitamos para recibir y procesar el stream de datos
# para iniciar el procesamiento del streaming necesitamos hacer el llamada al
# metodo start() del StreamingContext
words.foreachRDD(process)
streaming_context.start()
streaming_context.awaitTermination()
if __name__ == "__main__":
count_words()Existen algoritmos específicos en el API de ml de spark para ser ocupados con streaming: Streaming Linear Regression, Streaming KMeans, aunque estos viven en la librería prohibida de mllib (╥﹏╥), también es posible que ocupar los modelos “tradicionales” entrenados en batch pero implementados para recibir datos de stream para pedicciones… aunque de nuevo este puente solo está implementado para la librería de mllib (╥﹏╥)
Los DStream (como los RDD) se pueden guardar en memoria utilizando persist() lo que hará que se guarden los RDD del DStream en memoria, esta estrategia se ocupa si los datos que están en el DStream se procesarán varias veces (varias operaciones diferentes sobre los mismos datos). Los DStream que se generan de operaciones basadas en ventanas son implicitamente guardados en memoria por lo que ya no es necesario ocupar el persist.
Para los datos que vienen de fuentes avanzadas la persistencia por default está asociada a tener un factor de replicación de 2, la persistencia por default de los DStream consiste en mantener serializado los datos en memoria -lo que puede afectar el desempeño de nuestra aplicación de streaming-
Una aplicación de stream debe operar 24/7 por lo que debe ser resilente a fallas no relacionadas a la lógica de la aplicación, para cumplir con esta propiedad Spark streaming requiere de hacer checkpoints a un sistema de almacenamiento tolerante a fallas con información suficiente para recuperarse de la falla. Existen 2 tipos de checkpoint en Spark Streaming:
Se requiere de utilizar checkpointing cuando:
updateStateByKey o reduceByKeyAndWindow con inverse function \(\rightarrow\) se debe proveer de un directorio para dejar el checkpointMuchas de las aplicaciones de Spark Streaming no ocupan checkpoints y eso esta bien! :P